package confirm;

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.AMQP.BasicProperties;

@Slf4j
public class SendMsg {
    private static final String exchange_name = "test_exchange_topics1";
    public static void main(String[] args) throws Exception {
        Connection rabbitConn = ConnectUtils.getRabbitConn();

        // 创建通信管道
        Channel channel = rabbitConn.createChannel();


        // 准备数据
        Map<String,String> maps = new LinkedHashMap();
        maps.put("chaina.hubei.wuhan.20220227","香蕉bunana  武汉  20220227");
        maps.put("chaina.hubei.yichang.20220227","香蕉bunana  宜昌  20220227");
        maps.put("chaina.hubei.badong.20220227","香蕉bunana  巴东  20220227");
        maps.put("chaina.hubei.caidian.20220227","香蕉bunana  蔡甸  20220227");
        maps.put("chaina.hubei.xiantao.20220227","香蕉bunana  仙桃  20220227");
        maps.put("chaina.hubei.tianmen.20220227","香蕉bunana  天门  20220227");

        // 开启 confirm 监听模式
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                // 发送者向mq发送消息成功
                log.info("消息已被broker   接收，tagid为：{},multiple为：{}",deliveryTag,multiple);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                // 发送者向mq发送消息失败
                log.info("消息已被broker   拒绝，tagid为：{},multiple为：{}",deliveryTag,multiple);
            }
        });

        // 增加return 监听
        channel.addReturnListener(new ReturnCallback() {
            @Override
            public void handle(Return returnMessage) {
                log.info("====================");
                log.info("return编码：{}，return描述：{}", returnMessage.getReplyCode(),returnMessage.getReplyText());
                log.info("return 交换机：{}，return 路由key：{}", returnMessage.getExchange(),returnMessage.getRoutingKey());
                log.info("return 主题：{}", new String(returnMessage.getBody()));
                log.info("*********************");
            }
        });
        // 绑定交换机
        channel.exchangeDeclare(exchange_name, BuiltinExchangeType.TOPIC);

        // 发送数据
        Iterator<Map.Entry<String,String>> iterator = maps.entrySet().iterator();
        while (iterator.hasNext()){
            Map.Entry<String, String> next = iterator.next();
            Builder builder = new Builder();
            builder.deliveryMode(2);
            BasicProperties properties = builder.build();

            /**
             * 参数一：交换机名
             * 参数二：routingkey
             * 参数三：true  表示如果消息无法正常投递，则return给生产者 ；false 表示直接丢弃
             * 参数四：数据值
             */
            channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes());
        }
    }
}
